RocketMQ网络模型源码解析(2)

您所在的位置:网站首页 rocketmq 数据存储 RocketMQ网络模型源码解析(2)

RocketMQ网络模型源码解析(2)

2023-04-06 05:25| 来源: 网络整理| 查看: 265

RocketMQ源码版本4.9.11. 服务端初始化

腊八粥:RocketMQ网络模型源码解析(1)简介了RocketMQ的线程模型,其中业务线程(M2)是根据RequestCode获取对应的Processor及线程池,RequestCode对应的Processor及线程池的对应关系是在服务初始化的时候完成的。

(1)processor注册

public void registerProcessor() { ..... //消息发送 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); //写入 Consumer 消费失败的消息(CONSUMER_SEND_MSG_BACK) this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); ....... } --------------------------------------------------------- //processor注册方法 public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pair pair = new Pair(processor, executorThis); this.processorTable.put(requestCode, pair); } ------------------------------------------------------- HashMap processorTable = new HashMap(64);

fastRemotingServer和remotingServer只是端口不一样,fastRemotingServer的端口是在remotingServer端口(默认值:10911)的基础上-2(默认值:10909)。这两个端口,是服务端用来接受客户端连接的监听端口。fastRemotingServer,针对大量数据发送的时候,若网络阻塞,可以启用其他端口,提高发送效率。

(2)线程隔离

Pair存储Processor及对应的线程池,每个Processor的线程池都不一样,达到线程隔离的目的。例如:发送的线程池和消费的线程池不一样,即使生产者发送线程出了问题,并不会影响消费者消费,同时根据不通Processor可以针对线程池配置不同的参数,既能实现线程的高效利用,又能避免创建过多无用线程。

2.创建长链接

RocketMQ客户端或服务端初始化,并没有建立连接,只有在发送消息(同步/异步/Oneway方式)的时候才会去建立连接,相关代码如下:

长连接存储map,每个addr对应一个长连接的包装对象ConcurrentMap channelTables。这个addr,对应客户端/服务端/NameServer来说,地址都是不一样的。

if (createNewConnection) { //创建长连接 ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); //长连接创建完成,放入channelTables(每个addr对应一个长连接) this.channelTables.put(addr, cw); }3. 通信协议

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。

同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

Header字段类型Request说明Response说明codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误languageLanguageCode请求方实现的语言应答方实现的语言versionint请求方程序的版本应答方程序的版本opaqueint相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志remarkString传输自定义文本信息传输自定义文本信息extFieldsHashMap请求自定义扩展信息响应自定义扩展信息3.1 encode方法public ByteBuffer encode() { // 1> header length size int length = 4; // 2> header data length byte[] headerData = this.headerEncode(); length += headerData.length; // 3> body data length if (this.body != null) { length += body.length; } //这是因为在消息总长度的计算中没有将存储头部长度的4个字节计算在内 ByteBuffer result = ByteBuffer.allocate(4 + length); //将消息总长度放入ByteBuffer result.putInt(length); //将消息头数据放入ByteBuffer result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); // body data; if (this.body != null) { //将消息主体放入ByteBuffer result.put(this.body); } //重置ByteBuffer的position位置 result.flip(); return result; }encode

RemotingCommand的encode方法,可知传输内容主要可以分为以下4部分:

(1) 消息长度:总长度,四个字节存储,占用一个int类型;

(2) 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

(3) 消息头数据:经过序列化后的消息头数据;

(4) 消息主体数据:消息主体的二进制字节数据内容;

3.2 decode方法public static RemotingCommand decode(final ByteBuffer byteBuffer) { //获取byteBuffer的总长度 int length = byteBuffer.limit(); //获取前4个字节,组装int类型,该长度为总长度 int oriHeaderLen = byteBuffer.getInt(); //获取消息头的长度,这里和0xFFFFFF做与运算,编码时候的长度即为24位 int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); //endoce方法+4,这里减少4(头部存储长度) int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; }

encode和decode方法对应着看,头部的编码和解码,body的编码和解码逻辑,就比较简单。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3